package gorabbitmq

import (
	"context"
	"encoding/json"
	"github.com/go-kratos/kratos/v2/log"
	"github.com/wagslane/go-rabbitmq"
)

var _ IPublisher = (*publisher)(nil)

type IPublisher interface {
	Publish(ctx context.Context, exchange, routingKey string, message interface{}, options ...func(*rabbitmq.PublishOptions)) error
}

type publisher struct {
	*rabbitmq.Publisher
}

// NewRabbitMQPublisher 需要注入
func NewRabbitMQPublisher(conn *rabbitmq.Conn) (IPublisher, error) {
	pub, err := rabbitmq.NewPublisher(
		conn,
		rabbitmq.WithPublisherOptionsLogging,
	)
	if err != nil {
		return nil, err
	}
	return &publisher{pub}, nil
}

func (p *publisher) Publish(ctx context.Context, exchange, routingKey string, message interface{}, options ...func(*rabbitmq.PublishOptions)) (err error) {
	var data []byte
	switch message.(type) {
	case []byte:
		data = message.([]byte)
	default:
		data, err = json.Marshal(message)
		if err != nil {
			return
		}
	}
	//绑定交换机
	options = append(options, rabbitmq.WithPublishOptionsExchange(exchange))
	//发布的类型编码格式
	options = append(options, rabbitmq.WithPublishOptionsContentType("application/json"))
	//数据持久化(重启等可以恢复数据)
	options = append(options, rabbitmq.WithPublishOptionsPersistentDelivery)

	err = p.Publisher.PublishWithContext(
		ctx,
		data,
		[]string{routingKey},
		options...,
	)

	if err != nil {
		log.Errorw("publish mq error", err.Error(), "topic", routingKey, "data", string(data))
	}
	return
}
